package com.xiyun.easyapi.config.cdc;

import cn.hutool.json.JSON;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.xiyun.EasyApiApplication;
import com.xiyun.easyapi.config.util.Log;
import lombok.SneakyThrows;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;

@Component
@Profile("stg")
@SuppressWarnings("all")
public class CdcServer {

    private static final String table = ".cron";
    private static String host;
    private static Integer port;
    private static String username;
    private static String password;
    private static String database;

    @PostConstruct
    @SneakyThrows
    public static void init() {
        EasyApiApplication.pool.submit(() -> {
            try {
                MySqlSource<String> source = MySqlSource.<String>builder()
                        .hostname(host)
                        .port(port)
                        .username(username)
                        .password(password)
                        .databaseList(database)
                        .tableList(database + table)
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .includeSchemaChanges(false)
                        .build();
                StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
                env.enableCheckpointing(1000 * 60 * 60);
                env.fromSource(source,
                                WatermarkStrategy.noWatermarks(),
                                "mysql")
                        .setParallelism(1)
                        .addSink(new CustomSink())
                        .name("mysql_custom_sink");
                env.execute("mysql_job");
            } catch (Exception e) {
                Log.error("CDC服务启动失败", e);
            }
        });
    }

    @Value("${spring.datasource.host}")
    public void setHost(String host) {
        CdcServer.host = host;
    }

    @Value("${spring.datasource.port}")
    public void setPort(Integer port) {
        CdcServer.port = port;
    }

    @Value("${spring.datasource.username}")
    public void setUsername(String username) {
        CdcServer.username = username;
    }

    @Value("${spring.datasource.password}")
    public void setPassword(String password) {
        CdcServer.password = password;
    }

    @Value("${spring.datasource.database}")
    public void setDatabase(String database) {
        CdcServer.database = database;
    }

    public static class CustomSink extends RichSinkFunction<String> {
        @Override
        public void invoke(String json, Context context) throws Exception {
            JSON parse = JSONUtil.parse(json);
            Object op = parse.getByPath("$.op");
            if ("c".equals(op)) {
                //处理新增
                Object byPath = parse.getByPath("$.after");
            } else if ("u".equals(op)) {
                //处理更新
                JSONObject newObj = (JSONObject) JSONUtil.parse(parse.getByPath("$.after"));
                //匹配更新的字段
                JSONObject oldObj = (JSONObject) JSONUtil.parse(parse.getByPath("$.before"));
                ArrayList<String> objects = new ArrayList<>();
                newObj.keySet()
                        .forEach(k -> {
                            if (!newObj.get(k)
                                    .toString()
                                    .equals(oldObj.get(k)
                                            .toString())) {
                                objects.add("更新数据：" + k + "：" + oldObj.get(k) + "更新为" + newObj.get(k));
                            }
                        });
                System.out.println("更新数据：" + objects);
            } else if (op.equals("d")) {
                //处理删除
                Object byPath = parse.getByPath("$.before");
            }
        }
    }
}
